/////////////////////////////////////////////////////////////////////////////
// Original code from libhdfs3. Copyright (c) 2013 - 2014, Pivotal Inc.
// All rights reserved. Author: Zhanwei Wang
/////////////////////////////////////////////////////////////////////////////
//  Modifications by Kumo Inc.
// Copyright (C) Kumo inc. and its affiliates.
// Author: Jeff.li lijippy@163.com
// All rights reserved.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.
//



#include <turbo/times/time.h>
#include <kmhdfs/client/pipeline.h>
#include <turbo/log/logging.h>
#include <kmhdfs/common/exception.h>
#include <kmhdfs/common/exception_internal.h>
#include <kmhdfs/client/output_stream_inter.h>
#include <kmhdfs/client/file_system_inter.h>
#include <kmhdfs/client/data_transfer_protocol_sender.h>
#include <kmhdfs/proto/datatransfer.pb.h>
#include <kmhdfs/client/fault_jector.h>

#include <inttypes.h>

namespace Hdfs {
    namespace Internal {
        PipelineImpl::PipelineImpl(const char *path, SessionConfig &conf,
                                   std::shared_ptr<FileSystemInter> filesystem, int checksumType, int chunkSize,
                                   int replication, int64_t bytesSent, PacketPool &packetPool,
                                   std::shared_ptr<LocatedBlock> lastBlock,
                                   int64_t fileId) : checksumType(checksumType), chunkSize(chunkSize), errorIndex(-1),
                                                     replication(replication), bytesAcked(
                                                         bytesSent), bytesSent(bytesSent), packetPool(packetPool),
                                                     filesystem(filesystem), lastBlock(lastBlock), path(
                                                         path), fileId(fileId) {
            canAddDatanode = conf.canAddDatanode();
            blockWriteRetry = conf.getBlockWriteRetry();
            connectTimeout = conf.getOutputConnTimeout();
            readTimeout = conf.getOutputReadTimeout();
            writeTimeout = conf.getOutputWriteTimeout();
            clientName = filesystem->getClientName();
        }

        PipelineImpl::PipelineImpl(bool append, const char *path, const SessionConfig &conf,
                                   std::shared_ptr<FileSystemInter> filesystem, int checksumType, int chunkSize,
                                   int replication, int64_t bytesSent, PacketPool &packetPool,
                                   std::shared_ptr<LocatedBlock> lastBlock,
                                   int64_t fileId) : checksumType(checksumType), chunkSize(chunkSize), errorIndex(-1),
                                                     replication(replication), bytesAcked(
                                                         bytesSent), bytesSent(bytesSent), packetPool(packetPool),
                                                     filesystem(filesystem), lastBlock(lastBlock), path(
                                                         path), fileId(fileId) {
            canAddDatanode = conf.canAddDatanode();
            blockWriteRetry = conf.getBlockWriteRetry();
            connectTimeout = conf.getOutputConnTimeout();
            readTimeout = conf.getOutputReadTimeout();
            writeTimeout = conf.getOutputWriteTimeout();
            clientName = filesystem->getClientName();

            if (append) {
                VKLOG(290) << turbo::str_format("create pipeline for file %s to append to %s at position %" PRId64,
                                                path, lastBlock->toString().c_str(), lastBlock->getNumBytes());
                stage = PIPELINE_SETUP_APPEND;
                assert(lastBlock);
                nodes = lastBlock->getLocations();
                storageIDs = lastBlock->getStorageIDs();
                buildForAppendOrRecovery(false);
                stage = DATA_STREAMING;
            } else {
                VKLOG(290) << turbo::str_format("create pipeline for file %s to write to a new block", path);
                stage = PIPELINE_SETUP_CREATE;
                buildForNewBlock();
                stage = DATA_STREAMING;
            }
        }

        int PipelineImpl::findNewDatanode(const std::vector<DatanodeInfo> &original) {
            if (nodes.size() != original.size() + 1) {
                THROW(HdfsIOException, "Failed to acquire a datanode for block %s from namenode.",
                      lastBlock->toString().c_str());
            }

            for (size_t i = 0; i < nodes.size(); i++) {
                size_t j = 0;

                for (; j < original.size() && !(nodes[i] == original[j]); j++);

                if (j == original.size()) {
                    return i;
                }
            }

            THROW(HdfsIOException, "Cannot add new datanode for block %s.", lastBlock->toString().c_str());
        }

        void PipelineImpl::transfer(const ExtendedBlock &blk, const DatanodeInfo &src,
                                    const std::vector<DatanodeInfo> &targets, const Token &token) {
            std::shared_ptr<Socket> so(new TcpSocketImpl);
            std::shared_ptr<BufferedSocketReader> in(new BufferedSocketReaderImpl(*so));
            so->connect(src.getIpAddr().c_str(), src.getXferPort(), connectTimeout);
            DataTransferProtocolSender sender(*so, writeTimeout, src.formatAddress());
            sender.transferBlock(blk, token, clientName.c_str(), targets);
            int size;
            size = in->readVarint32(readTimeout);
            std::vector<char> buf(size);
            in->readFully(buf.data(), size, readTimeout);
            BlockOpResponseProto resp;

            if (!resp.ParseFromArray(buf.data(), size)) {
                THROW(HdfsIOException, "cannot parse datanode response from %s fro block %s.",
                      src.formatAddress().c_str(), lastBlock->toString().c_str());
            }

            if (Status::DT_PROTO_SUCCESS != resp.status()) {
                THROW(HdfsIOException, "Failed to transfer block to a new datanode %s for block %s.",
                      targets[0].formatAddress().c_str(),
                      lastBlock->toString().c_str());
            }
        }

        bool PipelineImpl::addDatanodeToPipeline(const std::vector<DatanodeInfo> &excludedNodes) {
            try {
                /*
                 * get a new datanode
                 */
                std::vector<DatanodeInfo> original = nodes;
                std::shared_ptr<LocatedBlock> lb;
                lb = filesystem->getAdditionalDatanode(path, *lastBlock, nodes, storageIDs,
                                                       excludedNodes, 1);
                nodes = lb->getLocations();
                storageIDs = lb->getStorageIDs();

                /*
                 * failed to add new datanode into pipeline.
                 */
                if (original.size() == nodes.size()) {
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to add new datanode into pipeline for block: %s file %s.",
                                   lastBlock->toString().c_str(), path.c_str());
                } else {
                    /*
                     * find the new datanode
                     */
                    int d = findNewDatanode(original);
                    /*
                     * in case transfer block fail.
                     */
                    errorIndex = d;
                    /*
                     * transfer replica
                     */
                    DatanodeInfo &src = d == 0 ? nodes[1] : nodes[d - 1];
                    std::vector<DatanodeInfo> targets;
                    targets.push_back(nodes[d]);
                    KLOG(INFO) << turbo::str_format("Replicate block %s from %s to %s for file %s.",
                                                    lastBlock->toString().c_str(),
                                                    src.formatAddress().c_str(), targets[0].formatAddress().c_str(),
                                                    path.c_str());
                    transfer(*lastBlock, src, targets, lb->getToken());
                    errorIndex = -1;
                    return true;
                }
            } catch (const HdfsCanceled &e) {
                throw;
            } catch (const HdfsFileSystemClosed &e) {
                throw;
            } catch (const SafeModeException &e) {
                throw;
            } catch (const HdfsException &e) {
                std::string buffer;
                KLOG(ERROR) << turbo::str_format(
                               "Failed to add a new datanode into pipeline for block: %s file %s.\n%s",
                               lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
            }

            return false;
        }

        void PipelineImpl::checkPipelineWithReplicas() {
            if (static_cast<int>(nodes.size()) < replication) {
                std::stringstream ss;
                ss.imbue(std::locale::classic());
                int size = nodes.size();

                for (int i = 0; i < size - 1; ++i) {
                    ss << nodes[i].formatAddress() << ", ";
                }

                if (nodes.empty()) {
                    ss << "Empty";
                } else {
                    ss << nodes.back().formatAddress();
                }

                KLOG(WARNING) << turbo::str_format(
                                 "the number of nodes in pipeline is %d [%s], is less than the expected number of replica %d for block %s file %s",
                                 static_cast<int>(nodes.size()), ss.str().c_str(), replication,
                                 lastBlock->toString().c_str(), path.c_str());
            }
        }

        void PipelineImpl::buildForAppendOrRecovery(bool recovery) {
            int64_t gs = 0;
            int retry = blockWriteRetry;
            exception_ptr lastException;
            std::vector<DatanodeInfo> excludedNodes;
            std::shared_ptr<LocatedBlock> lb;
            std::string buffer;

            do {
                /*
                 * Remove bad datanode from list of datanodes.
                 * If errorIndex was not set (i.e. appends), then do not remove
                 * any datanodes
                 */
                if (errorIndex >= 0) {
                    assert(lastBlock);
                    KLOG(ERROR) << turbo::str_format(
                                   "Pipeline: node %s is invalid and removed from pipeline when %s block %s for file %s, stage = %s.",
                                   nodes[errorIndex].formatAddress().c_str(),
                                   (recovery ? "recovery" : "append to"), lastBlock->toString().c_str(),
                                   path.c_str(), StageToString(stage));
                    excludedNodes.push_back(nodes[errorIndex]);
                    nodes.erase(nodes.begin() + errorIndex);

                    if (!storageIDs.empty()) {
                        storageIDs.erase(storageIDs.begin() + errorIndex);
                    }

                    if (nodes.empty()) {
                        THROW(HdfsIOException,
                              "Build pipeline to %s block %s failed: all datanodes are bad.",
                              (recovery ? "recovery" : "append to"), lastBlock->toString().c_str());
                    }

                    errorIndex = -1;
                }

                try {
                    gs = 0;

                    /*
                     * Check if the number of datanodes in pipeline satisfy the replication requirement,
                     * add new datanode if not
                     */
                    if (stage != PIPELINE_SETUP_CREATE && stage != PIPELINE_CLOSE
                        && static_cast<int>(nodes.size()) < replication && canAddDatanode) {
                        if (!addDatanodeToPipeline(excludedNodes)) {
                            THROW(HdfsIOException,
                                  "Failed to add new datanode into pipeline for block: %s file %s, "
                                  "set \"output.replace-datanode-on-failure\" to \"false\" to disable this feature.",
                                  lastBlock->toString().c_str(), path.c_str());
                        }
                    }

                    if (errorIndex >= 0) {
                        continue;
                    }

                    checkPipelineWithReplicas();
                    /*
                     * Update generation stamp and access token
                     */
                    lb = filesystem->updateBlockForPipeline(*lastBlock);
                    gs = lb->getGenerationStamp();
                    /*
                     * Try to build pipeline
                     */
                    createBlockOutputStream(lb->getToken(), gs, recovery);
                    /*
                     * everything is ok, reset errorIndex.
                     */
                    errorIndex = -1;
                    lastException = exception_ptr();
                    break; //break on success
                } catch (const HdfsInvalidBlockToken &e) {
                    lastException = current_exception();
                    recovery = true;
                    KLOG(ERROR) << turbo::str_format(
                                   "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %"
                                   PRId64 ",\n%s",
                                   lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer));
                    KLOG(INFO) << turbo::str_format("Try to recovery pipeline for block %s file %s.",
                                                    lastBlock->toString().c_str(), path.c_str());
                } catch (const HdfsTimeoutException &e) {
                    lastException = current_exception();
                    recovery = true;
                    KLOG(ERROR) << turbo::str_format(
                                   "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %"
                                   PRId64 ",\n%s",
                                   lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer));
                    KLOG(INFO) << turbo::str_format("Try to recovery pipeline for block %s file %s.",
                                                    lastBlock->toString().c_str(), path.c_str());
                } catch (const HdfsIOException &e) {
                    lastException = current_exception();
                    /*
                     * Set recovery flag to true in case of failed to create a pipeline for appending.
                     */
                    recovery = true;
                    KLOG(ERROR) << turbo::str_format(
                                   "Pipeline: Failed to build pipeline for block %s file %s, new generation stamp is %"
                                   PRId64 ",\n%s",
                                   lastBlock->toString().c_str(), path.c_str(), gs, GetExceptionDetail(e, buffer));
                    KLOG(INFO) << turbo::str_format("Try to recovery pipeline for block %s file %s.",
                                                    lastBlock->toString().c_str(), path.c_str());
                }

                /*
                 * we don't known what happened, no datanode is reported failure, reduce retry count in case infinite loop.
                 * it may caused by rpc call throw HdfsIOException
                 */
                if (errorIndex < 0) {
                    --retry;
                }
            } while (retry > 0);

            if (lastException) {
                rethrow_exception(lastException);
            }

            /*
             * Update pipeline at the namenode, non-idempotent RPC call.
             */
            lb->setPoolId(lastBlock->getPoolId());
            lb->setBlockId(lastBlock->getBlockId());
            lb->setLocations(nodes);
            lb->setStorageIDs(storageIDs);
            lb->setNumBytes(lastBlock->getNumBytes());
            lb->setOffset(lastBlock->getOffset());
            filesystem->updatePipeline(*lastBlock, *lb, nodes, storageIDs);
            lastBlock = lb;
        }

        void PipelineImpl::locateNextBlock(
            const std::vector<DatanodeInfo> &excludedNodes) {
            std::chrono::milliseconds sleeptime(100);
            std::chrono::milliseconds fiveSeconds(5000);
            int retry = blockWriteRetry;

            while (true) {
                try {
                    lastBlock = filesystem->addBlock(path, lastBlock.get(),
                                                     excludedNodes, fileId);
                    assert(lastBlock);
                    return;
                } catch (const NotReplicatedYetException &e) {
                    VKLOG(300) << turbo::str_format("Got NotReplicatedYetException when try to addBlock for block %s, "
                                                    "already retry %d times, max retry %d times",
                                                    lastBlock->toString().c_str(),
                                                    blockWriteRetry - retry, blockWriteRetry);

                    if (retry--) {
                        try {
                            sleep_for(sleeptime);
                        } catch (...) {
                        }

                        sleeptime *= 2;
                        sleeptime = sleeptime < fiveSeconds ? sleeptime : fiveSeconds;
                    } else {
                        throw;
                    }
                }
            }
        }

        static std::string FormatExcludedNodes(
            const std::vector<DatanodeInfo> &excludedNodes) {
            std::stringstream ss;
            ss.imbue(std::locale::classic());
            ss << "[";
            int size = excludedNodes.size();

            for (int i = 0; i < size - 1; ++i) {
                ss << excludedNodes[i].formatAddress() << ", ";
            }

            if (excludedNodes.empty()) {
                ss << "Empty";
            } else {
                ss << excludedNodes.back().formatAddress();
            }

            ss << "]";
            return ss.str();
        }

        void PipelineImpl::buildForNewBlock() {
            int retryAllocNewBlock = 0, retry = blockWriteRetry;
            LocatedBlock lb;
            std::vector<DatanodeInfo> excludedNodes;
            std::shared_ptr<LocatedBlock> block = lastBlock;
            std::string buffer;

            do {
                errorIndex = -1;
                lastBlock = block;

                try {
                    locateNextBlock(excludedNodes);
                    lastBlock->setNumBytes(0);
                    nodes = lastBlock->getLocations();
                    storageIDs = lastBlock->getStorageIDs();
                } catch (const HdfsRpcException &e) {
                    const char *lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null";
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to allocate a new empty block for file %s, last block %s, excluded nodes %s.\n%s",
                                   path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str(),
                                   GetExceptionDetail(e, buffer));

                    if (retryAllocNewBlock > blockWriteRetry) {
                        throw;
                    }

                    KLOG(INFO) << turbo::str_format(
                                  "Retry to allocate a new empty block for file %s, last block %s, excluded nodes %s.",
                                  path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str());
                    ++retryAllocNewBlock;
                    continue;
                } catch (const HdfsException &e) {
                    const char *lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null";
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to allocate a new empty block for file %s, last block %s, excluded nodes %s.\n%s",
                                   path.c_str(), lastBlockName, FormatExcludedNodes(excludedNodes).c_str(),
                                   GetExceptionDetail(e, buffer));
                    throw;
                }

                retryAllocNewBlock = 0;
                checkPipelineWithReplicas();

                if (nodes.empty()) {
                    THROW(HdfsIOException,
                          "No datanode is available to create a pipeline for block %s file %s.",
                          lastBlock->toString().c_str(), path.c_str());
                }

                try {
                    createBlockOutputStream(lastBlock->getToken(), 0, false);
                    break; //break on success
                } catch (const HdfsInvalidBlockToken &e) {
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to setup the pipeline for new block %s file %s.\n%s",
                                   lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
                } catch (const HdfsTimeoutException &e) {
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to setup the pipeline for new block %s file %s.\n%s",
                                   lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
                } catch (const HdfsIOException &e) {
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to setup the pipeline for new block %s file %s.\n%s",
                                   lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
                }

                KLOG(INFO) << turbo::str_format("Abandoning block: %s for file %s.", lastBlock->toString().c_str(),
                                                path.c_str());

                try {
                    filesystem->abandonBlock(*lastBlock, path, fileId);
                } catch (const HdfsException &e) {
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to abandon useless block %s for file %s.\n%s",
                                   lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
                    throw;
                }

                if (errorIndex >= 0) {
                    KLOG(INFO) << turbo::str_format("Excluding invalid datanode: %s for block %s for file %s",
                                                    nodes[errorIndex].formatAddress().c_str(),
                                                    lastBlock->toString().c_str(), path.c_str());
                    excludedNodes.push_back(nodes[errorIndex]);
                } else {
                    /*
                     * we don't known what happened, no datanode is reported failure, reduce retry count in case of infinite loop.
                     */
                    --retry;
                }
            } while (retry);
        }

        /*
         * bad link node must be either empty or a "IP:PORT"
         */
        void PipelineImpl::checkBadLinkFormat(const std::string &n) {
            std::string node = n;

            if (node.empty()) {
                return;
            }

            do {
                const char *host = node.data(), *port;
                size_t pos = node.find_last_of(":");

                if (pos == node.npos || pos + 1 == node.length()) {
                    break;
                }

                node[pos] = 0;
                port = node.data() + pos + 1;
                struct addrinfo hints, *addrs;
                memset(&hints, 0, sizeof(hints));
                hints.ai_family = PF_UNSPEC;
                hints.ai_socktype = SOCK_STREAM;
                hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
                int p;
                char *end;
                p = strtol(port, &end, 0);

                if (p >= 65536 || p <= 0 || end != port + strlen(port)) {
                    break;
                }

                if (getaddrinfo(host, port, &hints, &addrs)) {
                    break;
                }

                freeaddrinfo(addrs);
                return;
            } while (0);

            KLOG(FATAL) << turbo::str_format(
                           "Cannot parser the firstBadLink string %s, it should be a bug or protocol incompatible.",
                           n.c_str());
            THROW(HdfsException,
                  "Cannot parser the firstBadLink string %s, it should be a bug or protocol incompatible.",
                  n.c_str());
        }

        void PipelineImpl::createBlockOutputStream(const Token &token, int64_t gs, bool recovery) {
            std::string firstBadLink;
            exception_ptr lastError;
            bool needWrapException = true;

            try {
                sock = std::shared_ptr<Socket>(new TcpSocketImpl);
                reader = std::shared_ptr<BufferedSocketReader>(new BufferedSocketReaderImpl(*sock));
                sock->connect(nodes[0].getIpAddr().c_str(), nodes[0].getXferPort(),
                              connectTimeout);
                std::vector<DatanodeInfo> targets;

                for (size_t i = 1; i < nodes.size(); ++i) {
                    targets.push_back(nodes[i]);
                }

                DataTransferProtocolSender sender(*sock, writeTimeout,
                                                  nodes[0].formatAddress());
                sender.writeBlock(*lastBlock, token, clientName.c_str(), targets,
                                  (recovery ? (stage | 0x1) : stage), nodes.size(),
                                  lastBlock->getNumBytes(), bytesSent, gs, checksumType, chunkSize);
                int size;
                size = reader->readVarint32(readTimeout);
                std::vector<char> buf(size);
                reader->readFully(buf.data(), size, readTimeout);
                BlockOpResponseProto resp;

                if (!resp.ParseFromArray(buf.data(), size)) {
                    THROW(HdfsIOException, "cannot parse datanode response from %s for block %s.",
                          nodes[0].formatAddress().c_str(), lastBlock->toString().c_str());
                }

                Status pipelineStatus = resp.status();
                firstBadLink = resp.firstbadlink();

                if (Status::DT_PROTO_SUCCESS != pipelineStatus) {
                    needWrapException = false;

                    if (Status::DT_PROTO_ERROR_ACCESS_TOKEN == pipelineStatus) {
                        THROW(HdfsInvalidBlockToken,
                              "Got access token error for connect ack with firstBadLink as %s for block %s",
                              firstBadLink.c_str(), lastBlock->toString().c_str());
                    } else {
                        THROW(HdfsIOException, "Bad connect ack with firstBadLink as %s for block %s",
                              firstBadLink.c_str(), lastBlock->toString().c_str());
                    }
                }

                return;
            } catch (...) {
                errorIndex = 0;
                lastError = current_exception();
            }

            checkBadLinkFormat(firstBadLink);

            if (!firstBadLink.empty()) {
                for (size_t i = 0; i < nodes.size(); ++i) {
                    if (nodes[i].getXferAddr() == firstBadLink) {
                        errorIndex = i;
                        break;
                    }
                }
            }

            assert(lastError);

            if (!needWrapException) {
                rethrow_exception(lastError);
            }

            try {
                rethrow_exception(lastError);
            } catch (const HdfsException &e) {
                NESTED_THROW(HdfsIOException,
                             "Cannot create block output stream for block %s, "
                             "recovery flag: %s, with last generate stamp %" PRId64 ".",
                             lastBlock->toString().c_str(), (recovery ? "true" : "false"), gs);
            }
        }

        void PipelineImpl::resend() {
            assert(stage != PIPELINE_CLOSE);

            for (size_t i = 0; i < packets.size(); ++i) {
                ConstPacketBuffer b = packets[i]->getBuffer();
                sock->writeFully(b.getBuffer(), b.getSize(), writeTimeout);
                int64_t tmp = packets[i]->getLastByteOffsetBlock();
                bytesSent = bytesSent > tmp ? bytesSent : tmp;
            }
        }

        void PipelineImpl::send(std::shared_ptr<Packet> packet) {
            ConstPacketBuffer buffer = packet->getBuffer();

            if (!packet->isHeartbeat()) {
                packets.push_back(packet);
            }

            /*
             * too many packets pending on the ack. wait in case of consuming to much memory.
             */
            if (static_cast<int>(packets.size()) > packetPool.getMaxSize()) {
                waitForAcks(false);
            }

            bool failover = false;

            do {
                try {
                    if (failover) {
                        resend();
                    } else {
                        assert(sock);
                        // test bad node
                        if (FaultInjector::get().testBadWriterAtKillPos(bytesSent)) {
                            KLOG(ERROR) << turbo::str_format("testBadWriterAtKillPos, bytesSent=%ld, bytesAcked=%ld",
                                                             bytesSent, bytesAcked);
                            THROW(HdfsIOException, "bad RemoteBlockWriter");
                        }
                        sock->writeFully(buffer.getBuffer(), buffer.getSize(),
                                         writeTimeout);
                        int64_t tmp = packet->getLastByteOffsetBlock();
                        bytesSent = bytesSent > tmp ? bytesSent : tmp;
                    }

                    checkResponse(false);
                    return;
                } catch (const HdfsIOException &e) {
                    if (errorIndex < 0) {
                        errorIndex = 0;
                    }

                    sock.reset();
                }

                if (lastBlock->isStriped()) {
                    THROW(HdfsIOException, "ec block send failed");
                }

                buildForAppendOrRecovery(true);
                failover = true;

                if (stage == PIPELINE_CLOSE) {
                    assert(packets.size() == 1 && packets[0]->isLastPacketInBlock());
                    packets.clear();
                    break;
                }
            } while (true);
        }

        void PipelineImpl::processAck(PipelineAck &ack) {
            assert(!ack.isInvalid());
            int64_t seqno = ack.getSeqno();

            if (HEART_BEAT_SEQNO == seqno) {
                return;
            }

            assert(!packets.empty());
            Packet &packet = *packets[0];

            if (ack.isSuccess()) {
                if (packet.getSeqno() != seqno) {
                    THROW(HdfsIOException,
                          "processAck: pipeline ack expecting seqno %" PRId64 "  but received %" PRId64
                          " for block %s.",
                          packet.getSeqno(), seqno, lastBlock->toString().c_str());
                }

                int64_t tmp = packet.getLastByteOffsetBlock();
                bytesAcked = tmp > bytesAcked ? tmp : bytesAcked;
                assert(lastBlock);
                lastBlock->setNumBytes(bytesAcked);

                if (packet.isLastPacketInBlock()) {
                    sock.reset();
                }

                packetPool.releasePacket(packets[0]);
                packets.pop_front();
            } else {
                for (int i = ack.getNumOfReplies() - 1; i >= 0; --i) {
                    if (Status::DT_PROTO_SUCCESS != ack.getReply(i)) {
                        errorIndex = i;
                        /*
                         * handle block token expire as same as HdfsIOException.
                         */
                        THROW(HdfsIOException,
                              "processAck: ack report error at node: %s for block %s.",
                              nodes[i].formatAddress().c_str(), lastBlock->toString().c_str());
                    }
                }
            }
        }

        void PipelineImpl::processResponse() {
            PipelineAck ack;
            std::vector<char> buf;
            int size = reader->readVarint32(readTimeout);
            ack.reset();
            buf.resize(size);
            reader->readFully(buf.data(), size, readTimeout);
            ack.readFrom(buf.data(), size);

            if (ack.isInvalid()) {
                THROW(HdfsIOException,
                      "processAllAcks: get an invalid DataStreamer packet ack for block %s",
                      lastBlock->toString().c_str());
            }

            processAck(ack);
        }

        void PipelineImpl::checkResponse(bool wait) {
            int timeout = wait ? readTimeout : 0;
            bool readable = reader->poll(timeout);

            if (readable) {
                processResponse();
            } else if (wait) {
                THROW(HdfsIOException, "Timeout when reading response for block %s, datanode %s do not response.",
                      lastBlock->toString().c_str(),
                      nodes[0].formatAddress().c_str());
            }
        }

        void PipelineImpl::flush() {
            waitForAcks(true);
        }

        void PipelineImpl::waitForAcks(bool force) {
            bool failover = false;

            while (!packets.empty()) {
                /*
                 * just wait for some acks in case of consuming too much memory.
                 */
                if (!force && static_cast<int>(packets.size()) < packetPool.getMaxSize()) {
                    return;
                }

                try {
                    if (failover) {
                        resend();
                    }

                    // test bad node
                    if (FaultInjector::get().testBadWriterAtAckPos(bytesAcked)) {
                        KLOG(ERROR) << turbo::str_format("testBadWriterAtAckPos, bytesSent=%ld, bytesAcked=%ld",
                                                         bytesSent, bytesAcked);
                        THROW(HdfsIOException, "bad RemoteBlockWriter");
                    }

                    checkResponse(true);
                    failover = false;
                } catch (const HdfsIOException &e) {
                    if (errorIndex < 0) {
                        errorIndex = 0;
                    }

                    std::string buffer;
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to flush pipeline(index=%d) on datanode %s for block %s file %s.\n%s",
                                   errorIndex, nodes[errorIndex].formatAddress().c_str(), lastBlock->toString().c_str(),
                                   path.c_str(), GetExceptionDetail(e, buffer));
                    KLOG(INFO) << turbo::str_format("Rebuild pipeline to flush for block %s file %s.",
                                                    lastBlock->toString().c_str(), path.c_str());
                    sock.reset();
                    failover = true;
                }

                if (failover) {
                    if (lastBlock->isStriped()) {
                        failover = false;
                        THROW(HdfsIOException, "ec block wait ack failed");
                    }
                    buildForAppendOrRecovery(true);

                    if (stage == PIPELINE_CLOSE) {
                        assert(packets.size() == 1 && packets[0]->isLastPacketInBlock());
                        packets.clear();
                        break;
                    }
                }
            }
        }

        int64_t PipelineImpl::getBytesSent() {
            return bytesSent;
        }

        bool PipelineImpl::isClosed() {
            return stage == PIPELINE_CLOSE;
        }

        std::shared_ptr<LocatedBlock> PipelineImpl::close(std::shared_ptr<Packet> lastPacket) {
            // test bad node
            if (FaultInjector::get().testPipelineClose()) {
                KLOG(ERROR) << turbo::str_format("testPipelineClose, bytesSent=%ld, bytesAcked=%ld",
                                                 bytesSent, bytesAcked);
                THROW(HdfsIOException, "bad RemoteBlockWriter");
            }
            waitForAcks(true);
            lastPacket->setLastPacketInBlock(true);
            stage = PIPELINE_CLOSE;
            send(lastPacket);
            waitForAcks(true);
            sock.reset();
            lastBlock->setNumBytes(bytesAcked);
            VKLOG(290) << turbo::str_format("close pipeline for file %s, block %s with length %" PRId64,
                                            path.c_str(), lastBlock->toString().c_str(),
                                            lastBlock->getNumBytes());
            return lastBlock;
        }

        StripedPipelineImpl::StripedPipelineImpl(const char *path, SessionConfig &conf,
                                                 std::shared_ptr<FileSystemInter> filesystem, int checksumType,
                                                 int chunkSize,
                                                 int replication, int64_t bytesSent, PacketPool &packetPool,
                                                 std::shared_ptr<LocatedBlock> lastBlock,
                                                 int64_t fileId) : PipelineImpl(
            path, conf, filesystem, checksumType, chunkSize, replication,
            bytesSent, packetPool, lastBlock, fileId) {
            VKLOG(290) << turbo::str_format("create pipeline for file %s to write to a new block", path);
            stage = PIPELINE_SETUP_CREATE;
            buildForNewBlock(lastBlock);
            stage = DATA_STREAMING;
        }

        void StripedPipelineImpl::buildForNewBlock(std::shared_ptr<LocatedBlock> block) {
            int retryAllocNewBlock = 0, retry = blockWriteRetry;
            std::string buffer;

            do {
                errorIndex = -1;
                lastBlock = block;

                try {
                    lastBlock->setNumBytes(0);
                    nodes = lastBlock->getLocations();
                    storageIDs = lastBlock->getStorageIDs();
                } catch (const HdfsRpcException &e) {
                    const char *lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null";
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to allocate a new empty block for file %s, last block %s.\n%s",
                                   path.c_str(), lastBlockName, GetExceptionDetail(e, buffer));

                    if (retryAllocNewBlock > blockWriteRetry) {
                        throw;
                    }

                    KLOG(INFO) << turbo::str_format("Retry to allocate a new empty block for file %s, last block %s.",
                                                    path.c_str(), lastBlockName);
                    ++retryAllocNewBlock;
                    continue;
                } catch (const HdfsException &e) {
                    const char *lastBlockName = lastBlock ? lastBlock->toString().c_str() : "Null";
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to allocate a new empty block for file %s, last block %s.\n%s",
                                   path.c_str(), lastBlockName, GetExceptionDetail(e, buffer));
                    throw;
                }

                retryAllocNewBlock = 0;
                if (nodes.empty()) {
                    THROW(HdfsIOException,
                          "No datanode is available to create a pipeline for block %s file %s.",
                          lastBlock->toString().c_str(), path.c_str());
                }

                try {
                    // test bad node
                    if (FaultInjector::get().testCreateOutputStreamFailed()) {
                        KLOG(ERROR) << turbo::str_format("testCreateOutputStreamFailed, bytesSent=%ld, bytesAcked=%ld",
                                                         bytesSent, bytesAcked);
                        THROW(HdfsIOException, "bad RemoteBlockWriter");
                    }
                    createBlockOutputStream(lastBlock->getToken(), 0, false);
                    break; //break on success
                } catch (const HdfsInvalidBlockToken &e) {
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to setup the pipeline for new block %s file %s.\n%s",
                                   lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
                } catch (const HdfsTimeoutException &e) {
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to setup the pipeline for new block %s file %s.\n%s",
                                   lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
                } catch (const HdfsIOException &e) {
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to setup the pipeline for new block %s file %s.\n%s",
                                   lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
                }

                KLOG(INFO) << turbo::str_format("Abandoning block: %s for file %s.", lastBlock->toString().c_str(),
                                                path.c_str());

                try {
                    filesystem->abandonBlock(*lastBlock, path, fileId);
                } catch (const HdfsException &e) {
                    KLOG(ERROR) << turbo::str_format(
                                   "Failed to abandon useless block %s for file %s.\n%s",
                                   lastBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer));
                    throw;
                }

                if (errorIndex >= 0) {
                    KLOG(INFO) << turbo::str_format("Excluding invalid datanode: %s for block %s for file %s",
                                                    nodes[errorIndex].formatAddress().c_str(),
                                                    lastBlock->toString().c_str(), path.c_str());
                } else {
                    /*
                     * we don't known what happened, no datanode is reported failure, reduce retry count in case of infinite loop.
                     */
                    --retry;
                }

                if (lastBlock->isStriped() && errorIndex >= 0) {
                    THROW(HdfsIOException, "build for ec newBlock failed");
                }
            } while (retry);
        }
    }
}
